# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from hysop.constants import PenalizationFormulation
from hysop.backend.host.host_operator import HostOperator
from hysop.tools.htypes import check_instance, InstanceOf
from hysop.tools.decorators import debug
from hysop.fields.continuous_field import Field
from hysop.parameters.scalar_parameter import ScalarParameter
from hysop.parameters.tensor_parameter import TensorParameter
from hysop.topology.cartesian_descriptor import CartesianTopologyDescriptors
from hysop.methods import SpaceDiscretization
from hysop.core.memory.memory_request import MemoryRequest
from hysop.core.graph.graph import op_apply
from hysop.numerics.stencil.stencil_generator import (
StencilGenerator,
CenteredStencilGenerator,
MPQ,
)
import numpy as np
[docs]
class CommonPenalization:
""" """
__default_method = {SpaceDiscretization: SpaceDiscretization.FDC4}
__available_methods = {
SpaceDiscretization: (InstanceOf(SpaceDiscretization), InstanceOf(int))
}
[docs]
@classmethod
def default_method(cls):
dm = super().default_method()
dm.update(cls.__default_method)
return dm
[docs]
@classmethod
def available_methods(cls):
am = super().available_methods()
am.update(cls.__available_methods)
return am
@debug
def __new__(cls, **kwds):
return super().__new__(cls, **kwds)
@debug
def __init__(
self, obstacles, velocity, dt, coeff=None, ubar=None, formulation=None, **kwds
):
super().__init__(**kwds)
check_instance(dt, ScalarParameter)
check_instance(
coeff, (ScalarParameter, float, type(lambda x: x)), allow_none=True
)
check_instance(ubar, (TensorParameter, Field), allow_none=True)
check_instance(formulation, PenalizationFormulation, allow_none=True)
check_instance(
obstacles,
(tuple, dict),
values=Field,
keys=(ScalarParameter, float, type(lambda x: x)),
check_kwds=False,
)
if isinstance(coeff, ScalarParameter):
self.coeff = lambda o: coeff() * o
elif isinstance(coeff, type(lambda x: x)):
self.coeff = coeff
elif not coeff is None:
c = ScalarParameter("penal_coeff", initial_value=coeff)
self.coeff = lambda o: c() * o
if isinstance(obstacles, dict):
obs = {}
for c, o in obstacles.items():
if isinstance(c, ScalarParameter):
obs[lambda x: c() * x] = o
elif isinstance(c, type(lambda x: x)):
obs[c] = o
elif not c is None:
_ = ScalarParameter("penal_coeff", initial_value=c)
obs[lambda x: _() * x] = o
else:
obs = {}
for o in obstacles:
obs[self.coeff] = o
for o in obs.values():
assert o.nb_components == 1
self._ubar = ubar
if ubar is None:
self._ubar = TensorParameter(
name="ubar",
shape=(velocity.dim,),
quiet=True,
dtype=velocity.dtype,
initial_value=(0.0,) * velocity.dim,
)
if formulation is None or formulation is PenalizationFormulation.IMPLICIT:
self._compute_penalization = self._compute_penalization_implicit
else:
self._compute_penalization = self._compute_penalization_exact
self.obstacles = obs
def _compute_penalization_implicit(self):
dt = self.dt()
self.tmp[...] = 0.0
for c, o in self.dobstacles.items():
self.tmp[...] += (-dt) * c(o) / (1.0 + dt * c(o))
def _compute_penalization_exact(self):
dt = self.dt()
self.tmp[...] = 1.0
for c, o in self.dobstacles.items():
self.tmp[...] *= np.exp(-dt * c(o))
self.tmp[...] -= 1.0
[docs]
class PythonPenalizeVorticity(HostOperator, CommonPenalization):
""" """
__default_method = {SpaceDiscretization: SpaceDiscretization.FDC4}
__available_methods = {
SpaceDiscretization: (InstanceOf(SpaceDiscretization), InstanceOf(int))
}
[docs]
@classmethod
def default_method(cls):
dm = super().default_method()
dm.update(cls.__default_method)
return dm
[docs]
@classmethod
def available_methods(cls):
am = super().available_methods()
am.update(cls.__available_methods)
return am
@debug
def __new__(
cls,
obstacles,
variables,
velocity,
vorticity,
dt,
coeff=None,
ubar=None,
formulation=None,
**kwds,
):
return super().__new__(
cls, input_fields=None, output_fields=None, input_params=None, **kwds
)
@debug
def __init__(
self,
obstacles,
variables,
velocity,
vorticity,
dt,
coeff=None,
ubar=None,
formulation=None,
**kwds,
):
check_instance(velocity, Field)
check_instance(vorticity, Field)
check_instance(variables, dict, keys=Field, values=CartesianTopologyDescriptors)
input_fields = {velocity: variables[velocity], vorticity: variables[vorticity]}
output_fields = {vorticity: variables[vorticity]}
input_params = {dt}
for o in obstacles.values() if isinstance(obstacles, dict) else obstacles:
input_fields[o] = variables[o]
if isinstance(ubar, Field):
input_fields[ubar] = variables[ubar]
self.velocity = velocity
self.vorticity = vorticity
self.dt = dt
super().__init__(
input_fields=input_fields,
output_fields=output_fields,
input_params=input_params,
obstacles=obstacles,
velocity=velocity,
dt=dt,
coeff=coeff,
ubar=ubar,
formulation=formulation,
**kwds,
)
[docs]
@debug
def handle_method(self, method):
super().handle_method(method)
self.space_discretization = method.pop(SpaceDiscretization)
csg = CenteredStencilGenerator()
csg.configure(dtype=MPQ, dim=1)
try:
self.order = int(str(self.space_discretization)[3:])
except:
self.order = self.space_discretization
self.stencil = tuple(
csg.generate_exact_stencil(derivative=1, order=self.order) for _ in range(3)
)
[docs]
@debug
def get_field_requirements(self):
requirements = super().get_field_requirements()
stencil = self.stencil[0]
G = max(max(a, b) for a, b in zip(stencil.L, stencil.R))
for is_input, (field, td, req) in requirements.iter_requirements():
min_ghosts = (max(g, G) for g in req.min_ghosts.copy())
req.min_ghosts = min_ghosts
req.axes = (tuple(range(field.dim)),)
return requirements
[docs]
@debug
def discretize(self):
if self.discretized:
return
super().discretize()
dim = self.velocity.dim
self.dvelocity = self.input_discrete_tensor_fields[self.velocity]
if dim == 2:
self.dvorticity = self.input_discrete_fields[self.vorticity]
if dim == 3:
self.dvorticity = self.input_discrete_tensor_fields[self.vorticity]
dv, dw = self.dvelocity, self.dvorticity
stencil = self.stencil[0]
G = max(max(a, b) for a, b in zip(stencil.L, stencil.R))
view = dv.local_slices(ghosts=(G,) * dim)
V = tuple(Vi[view] for Vi in dv.buffers)
view = dw.local_slices(ghosts=(G,) * dim)
W = tuple(Wi[view] for Wi in dw.buffers)
self.W, self.V = W, V
self.dobstacles = {}
for c, o in self.obstacles.items():
o_df = self.input_discrete_fields[o]
self.dobstacles[c] = o_df.data[0][o_df.local_slices(ghosts=(G,) * dim)]
for s, dx in zip(self.stencil, dw.space_step):
s.replace_symbols({s.dx: dx})
self.w_ghost_exchanger = dw.build_ghost_exchanger()
self.v_ghost_exchanger = dv.build_ghost_exchanger()
if isinstance(self._ubar, TensorParameter):
self.get_ubar = lambda: self._ubar()[::-1]
else:
dubar = self.input_discrete_tensor_fields[self._ubar]
view = dubar.local_slices(ghosts=(G,) * dim)
uBar = tuple(Vi[view] for Vi in dubar.buffers)
self.get_ubar = lambda: uBar
[docs]
@debug
def get_work_properties(self):
requests = super().get_work_properties()
buffers = MemoryRequest.empty_like(
a=self.V[0], nb_components=4, backend=self.dvelocity.backend
)
requests.push_mem_request("Wtmp", buffers)
return requests
[docs]
def setup(self, work):
super().setup(work=work)
Wtmp = work.get_buffer(self, "Wtmp", handle=True)
self.tmp_v = Wtmp[0:3]
self.tmp = Wtmp[-1]
if self.velocity.dim == 2:
self._compute_vorticity = self._compute_vorticity_2d
if self.velocity.dim == 3:
self._compute_vorticity = self._compute_vorticity_3d
[docs]
@classmethod
def supported_dimensions(cls):
return (3, 2)
[docs]
@classmethod
def supports_mpi(cls):
return True
@op_apply
def apply(self, **kwds):
super().apply(**kwds)
self.v_ghost_exchanger()
# Penalize velocity
self._compute_penalization()
for ubar, v, tmp in zip(self.get_ubar(), self.V, self.tmp_v):
tmp[...] = (v - ubar) * self.tmp
self._compute_vorticity()
self.w_ghost_exchanger()
def _compute_vorticity_3d(self):
# compute penalized vorticity:
# (dvz/dy - dvy/dz)
# W = (dvx/dz - dvz/dx)
# (dvy/dx - dvx/dy)
# X direction
self.tmp = self.stencil[0](a=self.tmp_v[2], out=self.tmp, axis=2)
self.W[1][...] += -self.tmp
self.tmp = self.stencil[0](a=self.tmp_v[1], out=self.tmp, axis=2)
self.W[2][...] += self.tmp
# Y direction
self.tmp = self.stencil[1](a=self.tmp_v[0], out=self.tmp, axis=1)
self.W[2][...] += -self.tmp
self.tmp = self.stencil[1](a=self.tmp_v[2], out=self.tmp, axis=1)
self.W[0][...] += self.tmp
# Z direction
self.tmp = self.stencil[2](a=self.tmp_v[1], out=self.tmp, axis=0)
self.W[0][...] += -self.tmp
self.tmp = self.stencil[2](a=self.tmp_v[0], out=self.tmp, axis=0)
self.W[1][...] += self.tmp
def _compute_vorticity_2d(self):
# compute penalized vorticity:
# W = (dvy/dx - dvx/dy)
# X direction
self.tmp = self.stencil[0](a=self.tmp_v[1], out=self.tmp, axis=1)
self.W[0][...] += self.tmp
# Y direction
self.tmp = self.stencil[1](a=self.tmp_v[0], out=self.tmp, axis=0)
self.W[0][...] += -self.tmp
[docs]
class PythonPenalizeVelocity(HostOperator, CommonPenalization):
""" """
__default_method = {SpaceDiscretization: SpaceDiscretization.FDC4}
__available_methods = {
SpaceDiscretization: (InstanceOf(SpaceDiscretization), InstanceOf(int))
}
[docs]
@classmethod
def default_method(cls):
dm = super().default_method()
dm.update(cls.__default_method)
return dm
[docs]
@classmethod
def available_methods(cls):
am = super().available_methods()
am.update(cls.__available_methods)
return am
@debug
def __new__(
cls,
obstacles,
variables,
velocity,
dt,
coeff=None,
ubar=None,
formulation=None,
**kwds,
):
return super().__new__(
cls, input_fields=None, output_fields=None, input_params=None, **kwds
)
@debug
def __init__(
self,
obstacles,
variables,
velocity,
dt,
coeff=None,
ubar=None,
formulation=None,
**kwds,
):
check_instance(velocity, Field)
check_instance(variables, dict, keys=Field, values=CartesianTopologyDescriptors)
input_fields = {
velocity: variables[velocity],
}
output_fields = {velocity: variables[velocity]}
input_params = {dt}
for o in obstacles.values() if isinstance(obstacles, dict) else obstacles:
input_fields[o] = variables[o]
if isinstance(ubar, Field):
input_fields[ubar] = variables[ubar]
self.velocity = velocity
self.dt = dt
super().__init__(
input_fields=input_fields,
output_fields=output_fields,
input_params=input_params,
obstacles=obstacles,
velocity=velocity,
dt=dt,
coeff=coeff,
ubar=ubar,
formulation=formulation,
**kwds,
)
[docs]
@debug
def discretize(self):
if self.discretized:
return
super().discretize()
self.dvelocity = self.input_discrete_tensor_fields[self.velocity]
dv = self.dvelocity
self.dobstacles = {}
for c, o in self.obstacles.items():
o_df = self.input_discrete_fields[o]
self.dobstacles[c] = o_df.data[0]
self.v_ghost_exchanger = dv.build_ghost_exchanger()
if self.v_ghost_exchanger is None:
def _dummy_func():
pass
self.v_ghost_exchanger = _dummy_func
if isinstance(self._ubar, TensorParameter):
self.get_ubar = lambda: self._ubar()[::-1]
else:
dubar = self.input_discrete_tensor_fields[self._ubar]
self.get_ubar = lambda: dubar.buffers
[docs]
@debug
def get_work_properties(self):
requests = super().get_work_properties()
buffers = MemoryRequest.empty_like(
a=self.dvelocity.buffers[0], nb_components=1, backend=self.dvelocity.backend
)
requests.push_mem_request("Wtmp", buffers)
return requests
[docs]
def setup(self, work):
super().setup(work=work)
Wtmp = work.get_buffer(self, "Wtmp", handle=True)
self.tmp = Wtmp[-1]
[docs]
@classmethod
def supported_dimensions(cls):
return (3, 2)
[docs]
@classmethod
def supports_mpi(cls):
return True
@op_apply
def apply(self, **kwds):
super().apply(**kwds)
# Penalize velocity
self._compute_penalization()
for ubar, v in zip(self.get_ubar(), self.dvelocity.buffers):
v[...] += (v - ubar) * self.tmp
self.v_ghost_exchanger()